-
Notifications
You must be signed in to change notification settings - Fork 436
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
datastreams: Port data-streams-go to dd-trace-go #2006
Conversation
BenchmarksBenchmark execution time: 2023-09-06 13:45:30 Comparing candidate commit 6341dc6 in PR branch Found 0 performance improvements and 0 performance regressions! Performance is the same for 40 metrics, 1 unstable metrics. |
c1fac6c
to
0448e4d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, some core functionalities (like container ID parsing and config) need to be refactored first.
Hey @piochelepiotr ! Any update on this PR? It's been 2 months since the review, we'll probably consider it stale and close it if there is no progress in the next few weeks. Thanks! |
0448e4d
to
3d8ee19
Compare
7735514
to
386f073
Compare
d9f9cfc
to
ca184db
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👋
I left an initial review, almost exclusively on how it integrates with the existing tracer.
I'll still have to look more closely at the data streams package itself.
I would like to understand better what this is, why dd-trace-go is a good place for it, and why it requires changes to the tracer itself, whereas previously it was able to exist as a stand-alone module.
Thanks for the review! So the reason to put everything here is that the next step is to use this library in all the integrations. The reason it requires to make changes to the tracer, is because I want tracer.Start() to also start the data streams processor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I focused on the changes outside the new datastreams
folder. Overall I think this seems like a good path forward to simplifying things for customers!
3e2fccc
to
bb750a9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Definitely a bit nicer for customers to reduce the number of interfaces needed here! Just a few smaller change requests / questions :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments to consider, but overall this is a 👍 from me.
ddtrace/tracer/data_streams.go
Outdated
// SetDataStreamsCheckpoint sets a consume or produce checkpoint in a Data Streams pathway. | ||
// This enables tracking data flow & end to end latency. | ||
// To learn more about the data streams product, see: https://docs.datadoghq.com/data_streams/go/ | ||
func SetDataStreamsCheckpoint(ctx context.Context, edgeTags ...string) (p datastreams.Pathway, outCtx context.Context, ok bool) { | ||
return SetDataStreamsCheckpointWithParams(ctx, datastreams.NewCheckpointParams(), edgeTags...) | ||
} | ||
|
||
// SetDataStreamsCheckpointWithParams sets a consume or produce checkpoint in a Data Streams pathway. | ||
// This enables tracking data flow & end to end latency. | ||
// To learn more about the data streams product, see: https://docs.datadoghq.com/data_streams/go/ | ||
func SetDataStreamsCheckpointWithParams(ctx context.Context, params datastreams.CheckpointParams, edgeTags ...string) (p datastreams.Pathway, outCtx context.Context, ok bool) { | ||
if t, ok := internal.GetGlobalTracer().(datastreams.ProcessorContainer); ok { | ||
if processor := t.GetDataStreamsProcessor(); processor != nil { | ||
p, outCtx = processor.SetCheckpointWithParams(ctx, params, edgeTags...) | ||
return p, outCtx, true | ||
} | ||
} | ||
return datastreams.Pathway{}, ctx, false | ||
} | ||
|
||
// TrackKafkaCommitOffset should be used in the consumer, to track when it acks offset. | ||
// if used together with TrackKafkaProduceOffset it can generate a Kafka lag in seconds metric. | ||
func TrackKafkaCommitOffset(group, topic string, partition int32, offset int64) { | ||
if t, ok := internal.GetGlobalTracer().(datastreams.ProcessorContainer); ok { | ||
if p := t.GetDataStreamsProcessor(); p != nil { | ||
p.TrackKafkaCommitOffset(group, topic, partition, offset) | ||
} | ||
} | ||
} | ||
|
||
// TrackKafkaProduceOffset should be used in the producer, to track when it produces a message. | ||
// if used together with TrackKafkaCommitOffset it can generate a Kafka lag in seconds metric. | ||
func TrackKafkaProduceOffset(topic string, partition int32, offset int64) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks fine to me, but to be clear, this is becoming a public API, even though we are really only using it within the module right now, and we don't really expect users to directly use it.
The consequence of this is that this API cannot change at all once we release it.
If this is not a stable, complete API, and is still being developed at all, it may be better to find some way to hide it so it's not exposed publicly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah. given that not everyone uses the sarama or confluent clients, and not everyone stores offsets in Kafka, I think it's important to leave that options to customers.
The main change I see for this API, is an additional argument (Kafka cluster) that we might add in the future.
Maybe I can add it directly, but ignore it for now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
actually, I don't know what would be the API today. maybe a list of strings, maybe just one string.
So I think what we can do is add another function, but leave this one once we know how we will track clusters.
datastreams/context.go
Outdated
// This product includes software developed at Datadog (https://www.datadoghq.com/). | ||
// Copyright 2016-present Datadog, Inc. | ||
|
||
package datastreams |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered putting the public types and functions in a datastreams/internal
package instead of exposing everything to users in the current datastreams
package ? By having an internal package you'll be able to change it freely in the future without any backward-compatibility restrictions. This also a common practice in dd-trace-go, see ddtrace/internal
, profiler/internal
and contrib/internal
.
07a9894
to
6b36a9c
Compare
Pulled the changes from DataDog#2006 into the feature branch
What does this PR do?
Move the repository https://github.com/DataDog/data-streams-go to the main APM Datadog Go repository.
This way, the two can be better integrated together and users only need to install 1 library.
Motivation
Describe how to test/QA your changes
Reviewer's Checklist